package com.hzzftech.watchdog.busi.core.task;

import com.hzzftech.watchdog.busi.constants.BusiConstant;
import com.hzzftech.watchdog.busi.core.dance.JobChangeState;
import com.hzzftech.watchdog.busi.core.executor.DispatcherStatus;
import com.hzzftech.watchdog.busi.core.util.CarteClient;
import com.hzzftech.watchdog.busi.domain.*;
import com.hzzftech.watchdog.busi.module.dingding.service.DingService;
import com.hzzftech.watchdog.busi.module.email.service.IEmailService;
import com.hzzftech.watchdog.busi.service.*;
import com.hzzftech.watchdog.common.core.domain.entity.SysUser;
import com.hzzftech.watchdog.common.utils.StringUtils;
import com.hzzftech.watchdog.common.utils.spring.SpringUtils;
import com.hzzftech.watchdog.common.utils.uuid.IdUtils;
import com.hzzftech.watchdog.system.service.ISysUserService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.step.StepStatus;
import org.pentaho.di.www.SlaveServerJobStatus;
import org.pentaho.di.www.SlaveServerTransStatus;
import org.pentaho.di.www.WebResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Component("executeLogTask")
public class ExecuteLogTask {

    public static Logger logger = LoggerFactory.getLogger(ExecuteLogTask.class);

    @Autowired
    private IKtExecutionLogService executionLogService;

    @Autowired
    private IKtRepositoryNodeService nodeService;

    @Autowired
    private IKtDispatcherStepsService disStepsService;

    public void editLogStatus() {
        logger.debug("start editLogStatus");
        KtExecutionLog executionLog = new KtExecutionLog();
        executionLog.setStatus(DispatcherStatus.STATUS_RUNNING.getStatus()+"");
        List<KtExecutionLog> ktExecutionLogs = executionLogService.selectKtExecutionLogList(executionLog);

        if (logger.isDebugEnabled()) {
            logger.debug("日志定时任务，正在执行的任务个数[{}]", ktExecutionLogs.size());
        }

        if (CollectionUtils.isNotEmpty(ktExecutionLogs)) {
            Map<String, KtRepositoryNode> nodeMap = getNodeMap();
            ktExecutionLogs.forEach(e -> {
                editRnExecuteLog(e, nodeMap);
            });
        }

    }

    private void editRnExecuteLog(KtExecutionLog executionLog, Map<String, KtRepositoryNode> nodeMap) {
        KtRepositoryNode node = nodeMap.get(String.valueOf(executionLog.getSlaveId()));
        CarteClient carteClient = new CarteClient(node);
        StringBuilder urlBuffer = new StringBuilder();
        String path = CarteClient.JOB_STATUS;
        if (executionLog.getTaskType().equals(BusiConstant.KETTLE_TYPE_FLAG_TRANS)) {
            path = CarteClient.TRANS_STATUS;
        }
        urlBuffer.append(carteClient.getHttpUrl())
                .append(path)
                .append("/?xml=Y&name=")
                .append(executionLog.getTaskName())
                .append("&id=")
                .append(executionLog.getCarteId())
                .append("&from=0");
        try {
            HttpResponse response = carteClient.doGetResponse(urlBuffer.toString());
            logger.info("远程连接 job request url [{}], http status [{}]", urlBuffer.toString(), response.getStatusLine().getStatusCode());
            String content = EntityUtils.toString(response.getEntity());
            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK && StringUtils.isNotEmpty(content)) {
                if (executionLog.getTaskType().equals(BusiConstant.KETTLE_TYPE_FLAG_TRANS)) {
                    updateTransLog(content, executionLog, node);
                } else {
                    updateJobLog(content, executionLog, node);
                }
            } else {
                executionLog.setStatus(DispatcherStatus.STATUS_FAIL.getStatus()+"");
                executionLog.setEndTime(new Date());
            }
        } catch (Exception e) {
            String errorMsg = "";
            if (e instanceof IOException) {
                errorMsg = "错误信息："+ ExceptionUtils.getStackTrace(e);
                executionLog.setRemark(errorMsg);
                logger.error("查找任务执行日志失败",e);
            } else if (e instanceof KettleException){
                errorMsg = "错误信息："+((KettleException)e).getSuperMessage();
                executionLog.setRemark(errorMsg);
                logger.error("查找任务执行日志失败",e);
            }
        } finally {
            executionLogService.updateKtExecutionLog(executionLog);

            if ((DispatcherStatus.STATUS_FAIL.getStatus()+"").equals(executionLog.getStatus())) {
                // 发送错误提示
                sendNoticeMsg(executionLog);
            }

            JobChangeState.instance.jobStateChange();
        }
    }

    @Autowired
    private IEmailService emailService;

    @Autowired
    private DingService dingService;

    @Autowired
    private IKtWarnMsgService warnMsgService;

    @Autowired
    private IKtDispatcherService dispatcherService;

    // 任务执行失败后发送警告信息
    private void sendNoticeMsg(KtExecutionLog executionLog) {
        KtDispatcher ktDispatcher = dispatcherService.selectKtDispatcherById(executionLog.getTaskId(), executionLog.getDpRepoType());
//        if (StringUtils.isNotEmpty(ktDispatcher.getWarnMsgIds())) {
//            String notiMsg = executionLog.getExecutionLog();
//            if (StringUtils.isEmpty(notiMsg)) {
//                notiMsg = "您的kettle任务出现异常，请登录后台查看错误信息！";
//            }
//            if (notiMsg.length() > 10000) {
//                notiMsg = executionLog.getExecutionLog().substring(0, 10000) + "\n......";
//            }
//            for (String warnId : ktDispatcher.getWarnMsgIds().split(",")) {
//                KtWarnMsg ktWarnMsg = warnMsgService.selectKtWarnMsgById(Long.parseLong(warnId));
//                if (ktWarnMsg.getStatus().equals(BusiConstant.STATUS_YES)) {
//                    if (ktWarnMsg.getContactType().equals(BusiConstant.CONTACT_TYPE_PHONE)) {
//                        // TODO 短信发送窗口
//                    } else if (ktWarnMsg.getContactType().equals(BusiConstant.CONTACT_TYPE_EMAIL)) {
//                        emailService.sendAttachMail(ktWarnMsg.geteMailAddr(), BusiConstant.CONTACT_HEADER_MSG, notiMsg);
//                    } else if (ktWarnMsg.getContactType().equals(BusiConstant.CONTACT_TYPE_DINGDING)) {
//                        dingService.sendDingTalk(notiMsg);
//                    }
//                }
//            }
//        }
//        IKtDispatcherFailedMsgService failedMsgService = SpringUtils.getBean(IKtDispatcherFailedMsgService.class);
//        KtDispatcherFailedMsg msg = new KtDispatcherFailedMsg();
//        msg.setDpId(ktDispatcher.getDpId());
//        msg.setDpName(ktDispatcher.getDpName());
//        msg.setFailedTime(new Date());
//        msg.setSendTime(0L);
//        msg.setStatus(BusiConstant.STATUS_YES);
//        failedMsgService.insertKtDispatcherFailedMsg(msg);


        if (StringUtils.isNotEmpty(ktDispatcher.getWarnMsgIds() )) {
            IKtDispatcherFailedMsgService failedMsgService = SpringUtils.getBean(IKtDispatcherFailedMsgService.class);
            ISysUserService userService = SpringUtils.getBean(ISysUserService.class);
            for (String id : ktDispatcher.getWarnMsgIds().split(",")) {
                KtDispatcherFailedMsg msg = new KtDispatcherFailedMsg();
                msg.setDpId(ktDispatcher.getDpId());
                msg.setFailedTime(new Date());
                msg.setSendTime(0L);
                msg.setStatus(BusiConstant.STATUS_YES);
                msg.setDpName(ktDispatcher.getDpName());
                msg.setUserId(Long.parseLong(id));
                SysUser user = userService.selectUserById(Long.parseLong(id));
                msg.setUserName(user.getUserName());
                msg.setEmail(user.getEmail());
                msg.setPhone(user.getPhonenumber());
                failedMsgService.insertKtDispatcherFailedMsg(msg);
            }
        }
    }

    // 更新任务执行日志
    private void updateJobLog(String content, KtExecutionLog executionLog, KtRepositoryNode node) throws KettleException {
        WebResult webResult = WebResult.fromXMLString(content);
        if (webResult.getResult()!= null) {
            if (webResult.getResult().equals(WebResult.STRING_OK)) {
                executionLog.setStatus(DispatcherStatus.STATUS_SUCCESS.getStatus()+"");
            } else {
                executionLog.setStatus(DispatcherStatus.STATUS_FAIL.getStatus()+"");
                executionLog.setEndTime(new Date());
            }
        } else {
            SlaveServerJobStatus status = SlaveServerJobStatus.fromXML(content);
            logger.info("获取远程任务日志信息 cartedId={}, slaveId={}, hostName={}, currStatus={}",executionLog.getCarteId(),
                executionLog.getSlaveId(), node.getNodeHost(), status.isRunning());
            if (status.isFinished()) {
                updateDisStatus(executionLog, status.getStatusDescription());
            }
            executionLog.setExecutionLog(status.getLoggingString());
            executionLogService.updateKtExecutionLog(executionLog);
        }

    }

    // 更新执行步骤信息
    private void updateDisStatus(KtExecutionLog executionLog, String currentStatus) {
        if (currentStatus.equals(Trans.STRING_FINISHED)) {
            executionLog.setStatus(DispatcherStatus.STATUS_SUCCESS.getStatus() + "");
            executionLog.setEndTime(new Date());
        } else {
            executionLog.setStatus(DispatcherStatus.STATUS_FAIL.getStatus()+"");
            executionLog.setEndTime(new Date());
        }
    }

    // 更新trans执行日志
    private void updateTransLog(String content, KtExecutionLog executionLog, KtRepositoryNode node) throws KettleException {
        SlaveServerTransStatus transStatus = SlaveServerTransStatus.fromXML(content);
        logger.info("获取远程任务日志信息 cartedId={}, slaveId={}, hostName={}, currStatus={}",executionLog.getCarteId(),
                executionLog.getSlaveId(), node.getNodeHost(), transStatus.isRunning());
        getRnStepStatus(executionLog.getId(), transStatus);
        executionLog.setExecutionLog(transStatus.getLoggingString());
        if (transStatus.isFinished()) {
            updateDisStatus(executionLog,transStatus.getStatusDescription());
        }
        executionLogService.updateKtExecutionLog(executionLog);
    }

    // 运行状态获取
    private void getRnStepStatus(String logId, SlaveServerTransStatus transStatus) {
        disStepsService.deleteKtDispatcherStepsById(logId);
        List<StepStatus> stepStatusList = transStatus.getStepStatusList();
        if (CollectionUtils.isNotEmpty(stepStatusList)) {
            for (StepStatus status:
            stepStatusList) {
                KtDispatcherSteps steps = getKDisStepStatus(status);
                steps.setTaskId(logId);
                steps.setId(IdUtils.simpleUUID());
                disStepsService.insertKtDispatcherSteps(steps);
            }
        }
    }

    // 运行步骤获取
    private KtDispatcherSteps getKDisStepStatus(StepStatus status) {
        KtDispatcherSteps steps = new KtDispatcherSteps();
        steps.setCopy(Long.valueOf(String.valueOf(status.getCopy())));
        steps.setLinesRead(status.getLinesRead());
        steps.setStepName(status.getStepname());
        steps.setLinesWritten(status.getLinesWritten());
        steps.setLinesInput(status.getLinesInput());

        steps.setLinesOutput(status.getLinesOutput());
        steps.setLinesUpdated(status.getLinesUpdated());
        steps.setLinesRejected(status.getLinesRead());
        steps.setLogErrors(status.getErrors());
        steps.setStatusDescription(status.getStatusDescription());
        steps.setLogSeconds(new Double(status.getSeconds()).longValue());
        steps.setSpeed(status.getSpeed());
        steps.setLogPriortty(status.getPriority());
        steps.setStopped(status.isStopped() ? "N": "Y");
        steps.setPaused(status.isPaused() ? "N":"Y");
        return steps;
    }

    // 获取执行服务器
    private Map<String, KtRepositoryNode> getNodeMap() {
        Map<String, KtRepositoryNode> nodeMap = new HashMap<>();
        KtRepositoryNode node = new KtRepositoryNode();
        node.setStatus(BusiConstant.STATUS_YES);
        List<KtRepositoryNode> nodes = nodeService.selectKtRepositoryNodeList(node);
        if (CollectionUtils.isNotEmpty(nodes)) {
            nodes.forEach(e -> {
                nodeMap.put(String.valueOf(e.getNodeId()), e);
            });
        }
        return nodeMap;
    }
}
